package com.open.cloud.rabbitmq.producer;

import com.open.cloud.rabbitmq.common.Constants;
import com.open.cloud.rabbitmq.common.DetailResponse;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author kong
 * @date 2021/4/29
 * blog: http://blog.kongyin.ltd
 */
@Slf4j
@Component
public class RabbitMQBuildMessageProducer {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageProducer buildMessageSender(final String exchange, final String routingKey, final String queue) throws IOException {
        return buildMessageSender(exchange,routingKey,queue, Constants.DIRECT_TYPE);
    }

    public MessageProducer bulidTopicMessageSender(final String exchange, final String routringKey) throws IOException {
        return buildMessageSender(exchange,routringKey,null,Constants.TOPIC_TYPE);
    }

    /**
     * 发送消息
     * @param exchange    消息交换机
     * @param routingKey  消息路由key
     * @param queue       消息队列
     * @param type        消息类型
     * return
     */
    public MessageProducer buildMessageSender(final String exchange,final String routingKey,
                                              final String queue,final String type) throws IOException {
        //创建链接
        Connection connection = connectionFactory.createConnection();
        //创建通道
        if (Constants.DIRECT_TYPE.equals(type)){
            buildQueue(exchange,routingKey,queue,connection,Constants.DIRECT_TYPE);
        }else if (Constants.TOPIC_TYPE.equals(type)){
            buildTopic(exchange,connection);
        }

        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setExchange(exchange);
        rabbitTemplate.setRoutingKey(routingKey);
        //设置message序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            if(!ack){
                //可以进行日志记录，异常处理，补偿处理等
                log.info("send message failed: " + cause + correlationData.toString());
            }else {
                //TODO 更新数据库，可靠性投递机制
            }
        }));

        rabbitTemplate.setReturnCallback(((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
            log.info("send message failed: " + replyCode + " " + replyText);
            rabbitTemplate.send(message);
        }));

        return new MessageProducer(){
            @Override
            public DetailResponse send(Object message) {
                return send(message);
            }
        };
    }

    private void buildQueue(String exchange, String routingKey,
                            final String queue, Connection connection, String type) throws IOException {
        Channel channel = connection.createChannel(false);
        if(Constants.DIRECT_TYPE.equals(type)){
            channel.exchangeDeclare(exchange,Constants.DIRECT_TYPE,true,false,null);
        }else if (Constants.TOPIC_TYPE.equals(type)){
            channel.exchangeDeclare(exchange,Constants.TOPIC_TYPE,true,false,null);
        }
        //声明
        channel.queueDeclare(queue,true,false,false,null);
        //构建
        channel.queueBind(queue,exchange,routingKey);
        try {
            //关闭
            channel.close();
        } catch (TimeoutException e) {
            log.info("close channel time out ", e);
        }
    }

    private void buildTopic(String exchange, Connection connection) throws IOException {
        Channel channel = connection.createChannel(false);
        channel.exchangeDeclare(exchange, Constants.TOPIC_TYPE, true, false, null);
    }
}
